package cn.tiakon.dmp.etl

import cn.tiakon.dmp.untils.{ContextUtils, Utils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

/**
  * 原始日志(.logs文件)转换成parquet文件
  *
  * @author Tiakon
  *         2018/3/28 19:49
  */
object Logs2Parquet {
  def main(args: Array[String]): Unit = {

    val sc = ContextUtils.getSparkContext()

    val sqlContext: SQLContext = new SQLContext(sc)

    if (args.length != 2) {
      println(
        """
          |
          |参数异常
          |inputDataPath、outDataPath
          |
        """.stripMargin)
      sys.exit(1)
    }

    val Array(inputDataPath, outDataPath) = args


    val rdd: RDD[String] = sc.textFile(inputDataPath)

    //  -1  截取空字符串；并过滤掉长度小于85的信息
    val splited: RDD[Array[String]] = rdd.map(_.split(",", -1)).filter(arr => arr.length >= 85)

    //    将 RDD[String] 转换成 RDD[Row]
    val rowRdd: RDD[Row] = splited.map(fields => {
      Row(
        fields(0),
        Utils.str2Int(fields(1)),
        Utils.str2Int(fields(2)),
        Utils.str2Int(fields(3)),
        Utils.str2Int(fields(4)),
        fields(5),
        fields(6),
        Utils.str2Int(fields(7)),
        Utils.str2Int(fields(8)),
        Utils.str2Double(fields(9)),
        Utils.str2Double(fields(10)),
        fields(11),
        fields(12),
        fields(13),
        fields(14),
        fields(15),
        fields(16),
        Utils.str2Int(fields(17)),
        fields(18),
        fields(19),
        Utils.str2Int(fields(20)),
        Utils.str2Int(fields(21)),
        fields(22),
        fields(23),
        fields(24),
        fields(25),
        Utils.str2Int(fields(26)),
        fields(27),
        Utils.str2Int(fields(28)),
        fields(29),
        Utils.str2Int(fields(30)),
        Utils.str2Int(fields(31)),
        Utils.str2Int(fields(32)),
        fields(33),
        Utils.str2Int(fields(34)),
        Utils.str2Int(fields(35)),
        Utils.str2Int(fields(36)),
        fields(37),
        Utils.str2Int(fields(38)),
        Utils.str2Int(fields(39)),
        Utils.str2Double(fields(40)),
        Utils.str2Double(fields(41)),
        Utils.str2Int(fields(42)),
        fields(43),
        Utils.str2Double(fields(44)),
        Utils.str2Double(fields(45)),
        fields(46),
        fields(47),
        fields(48),
        fields(49),
        fields(50),
        fields(51),
        fields(52),
        fields(53),
        fields(54),
        fields(55),
        fields(56),
        Utils.str2Int(fields(57)),
        Utils.str2Double(fields(58)),
        Utils.str2Int(fields(59)),
        Utils.str2Int(fields(60)),
        fields(61),
        fields(62),
        fields(63),
        fields(64),
        fields(65),
        fields(66),
        fields(67),
        fields(68),
        fields(69),
        fields(70),
        fields(71),
        fields(72),
        Utils.str2Int(fields(73)),
        Utils.str2Double(fields(74)),
        Utils.str2Double(fields(75)),
        Utils.str2Double(fields(76)),
        Utils.str2Double(fields(77)),
        Utils.str2Double(fields(78)),
        fields(79),
        fields(80),
        fields(81),
        fields(82),
        fields(83),
        //        fields(84)
        Utils.str2Int(fields(84))
      )
    })

    //创建 DataFrames = schema + Rdd
    val schema: StructType = StructType(
      Seq(
        StructField("sessionid", StringType),
        StructField("advertisersid", IntegerType),
        StructField("adorderid", IntegerType),
        StructField("adcreativeid", IntegerType),
        StructField("adplatformproviderid", IntegerType),
        StructField("sdkversion", StringType),
        StructField("adplatformkey", StringType),
        StructField("putinmodeltype", IntegerType),
        StructField("requestmode", IntegerType),
        StructField("adprice", DoubleType),
        StructField("adppprice", DoubleType),
        StructField("requestdate", StringType),
        StructField("ip", StringType),
        StructField("appid", StringType),
        StructField("appname", StringType),
        StructField("uuid", StringType),
        StructField("device", StringType),
        StructField("client", IntegerType),
        StructField("osversion", StringType),
        StructField("density", StringType),
        StructField("pw", IntegerType),
        StructField("ph", IntegerType),
        StructField("long", StringType),
        StructField("lat", StringType),
        StructField("provincename", StringType),
        StructField("cityname", StringType),
        StructField("ispid", IntegerType),
        StructField("ispname", StringType),
        StructField("networkmannerid", IntegerType),
        StructField("networkmannername", StringType),
        StructField("iseffective", IntegerType),
        StructField("isbilling", IntegerType),
        StructField("adspacetype", IntegerType),
        StructField("adspacetypename", StringType),
        StructField("devicetype", IntegerType),
        StructField("processnode", IntegerType),
        StructField("apptype", IntegerType),
        StructField("district", StringType),
        StructField("paymode", IntegerType),
        StructField("isbid", IntegerType),
        StructField("bidprice", DoubleType),
        StructField("winprice", DoubleType),
        StructField("iswin", IntegerType),
        StructField("cur", StringType),
        StructField("rate", DoubleType),
        StructField("cnywinprice", DoubleType),
        StructField("imei", StringType),
        StructField("mac", StringType),
        StructField("idfa", StringType),
        StructField("openudid", StringType),
        StructField("androidid", StringType),
        StructField("rtbprovince", StringType),
        StructField("rtbcity", StringType),
        StructField("rtbdistrict", StringType),
        StructField("rtbstreet", StringType),
        StructField("storeurl", StringType),
        StructField("realip", StringType),
        StructField("isqualityapp", IntegerType),
        StructField("bidfloor", DoubleType),
        StructField("aw", IntegerType),
        StructField("ah", IntegerType),
        StructField("imeimd5", StringType),
        StructField("macmd5", StringType),
        StructField("idfamd5", StringType),
        StructField("openudidmd5", StringType),
        StructField("androididmd5", StringType),
        StructField("imeisha1", StringType),
        StructField("macsha1", StringType),
        StructField("idfasha1", StringType),
        StructField("openudidsha1", StringType),
        StructField("androididsha1", StringType),
        StructField("uuidunknow", StringType),
        StructField("userid", StringType),
        StructField("iptype", IntegerType),
        StructField("initbidprice", DoubleType),
        StructField("adpayment", DoubleType),
        StructField("agentrate", DoubleType),
        StructField("lomarkrate", DoubleType),
        StructField("adxrate", DoubleType),
        StructField("title", StringType),
        StructField("keywords", StringType),
        StructField("tagid", StringType),
        StructField("callbackdate", StringType),
        StructField("channelid", StringType),
        StructField("mediatype", IntegerType)
      )
    )

    Utils.deleteFileByCoreSite(sc, outDataPath)

    val dataFrame: DataFrame = sqlContext.createDataFrame(rowRdd, schema)
    dataFrame.registerTempTable("logs")

    Utils.deleteFileByCoreSite(sc, outDataPath)

    dataFrame.write.parquet(outDataPath)
    val selected: DataFrame = sqlContext.sql("select * from logs")
    selected.show()
    sc.stop()
  }
}
